從過去幾天的探索,我們發現 NautilusTrader 雖然提供的是 Python 使用介面,但核心運算其實是由 Rust 驅動。
這種設計的好處,是能讓開發者享受 Python 的易用性,同時又能利用 Rust 的高效能與安全性。
在 NautilusTrader 的架構演進中:
觀察他們的 GitHub,可以發現 v2 的第一個 commit 才在上個月 18 號,轉移工作才剛開始,因此本次我們將聚焦在 PyO3 的介紹與實作,先從原理與範例掌握概念,再回頭比較 NautilusTrader v1 的做法。
PyO3 套件專門提供 Rust/Python 相互呼叫的函式庫,封裝了底層 FFI 所需的型別轉換與呼叫規則,非常適合用來將 計算密集型的邏輯交給 Rust 處理,而非全交由 Python。
今天會先示範一個例子,之後幾天會做更深度的討論
首先 先安裝 maturin 可以參考底下網站 選擇你想要的方式
https://pyo3.rs/v0.25.1/getting-started.html
安裝完後,建立ㄧ個新的專案夾
進入專案夾
maturin init
選擇 Pyo3
接著安裝 Pyo3套件
cargo add pyo3
修改 Cargo.toml
[package]
name = "vwap_pnl"
version = "0.1.0"
edition = "2021"
[lib]
name = "vwap_pnl"
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.22", features = ["extension-module"] }
numpy = "0.22"
[profile.release]
codegen-units = 1
lto = "fat"
opt-level = 3
撰寫 Rust
use numpy::{PyArray1, PyReadonlyArray1};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};
#[pyfunction]
fn compute_vwap_pnl<'py>(
py: Python<'py>,
bid: PyReadonlyArray1<'py, f64>,
ask: PyReadonlyArray1<'py, f64>,
side: PyReadonlyArray1<'py, i8>, // 1=buy, -1=sell
size: PyReadonlyArray1<'py, f64>, // >0
) -> PyResult<Bound<'py, PyDict>> {
// zero-copy slice views
let bid = bid.as_slice()?;
let ask = ask.as_slice()?;
let side = side.as_slice()?;
let size = size.as_slice()?;
let n = bid.len();
if ask.len() != n || side.len() != n || size.len() != n {
return Err(pyo3::exceptions::PyValueError::new_err("array length mismatch"));
}
let mut fill_price = vec![0.0f64; n];
let mut pnl_per_fill = vec![0.0f64; n];
let mut notional_sum = 0.0f64;
let mut qty_sum = 0.0f64;
use std::collections::VecDeque;
// (qty, px): qty>0 long lot, qty<0 short lot
let mut lots: VecDeque<(f64, f64)> = VecDeque::new();
let mut realized_pnl = 0.0f64;
for i in 0..n {
let s = side[i] as i32;
let q = size[i];
if q <= 0.0 {
continue;
}
let px = if s > 0 { ask[i] } else { bid[i] };
fill_price[i] = px;
notional_sum += px * q;
qty_sum += q;
if s > 0 {
// BUY:先對沖 short
let mut remain = q;
while remain > 0.0 {
if let Some(&(lot_q, lot_px)) = lots.front() {
if lot_q < 0.0 {
let match_q = remain.min(-lot_q);
realized_pnl += (lot_px - px) * match_q; // short pnl
pnl_per_fill[i] += (lot_px - px) * match_q;
let new_q = lot_q + match_q; // lot_q<0 → toward 0
lots.pop_front();
if new_q != 0.0 {
lots.push_front((new_q, lot_px));
}
remain -= match_q;
} else {
break;
}
} else {
break;
}
}
if remain > 0.0 {
lots.push_back((remain, px)); // 新增 long lot
}
} else {
// SELL:先對沖 long
let mut remain = q;
while remain > 0.0 {
if let Some(&(lot_q, lot_px)) = lots.front() {
if lot_q > 0.0 {
let match_q = remain.min(lot_q);
realized_pnl += (px - lot_px) * match_q; // long pnl
pnl_per_fill[i] += (px - lot_px) * match_q;
let new_q = lot_q - match_q;
lots.pop_front();
if new_q != 0.0 {
lots.push_front((new_q, lot_px));
}
remain -= match_q;
} else {
break;
}
} else {
break;
}
}
if remain > 0.0 {
lots.push_back((-remain, px)); // 新增 short lot
}
}
}
let vwap = if qty_sum > 0.0 { notional_sum / qty_sum } else { 0.0 };
// 以 *Bound 風格* 建立回傳物件
let out = PyDict::new_bound(py);
let fill_price_arr = PyArray1::from_vec_bound(py, fill_price);
let pnl_per_fill_arr = PyArray1::from_vec_bound(py, pnl_per_fill);
out.set_item("fill_price", fill_price_arr)?;
out.set_item("pnl_per_fill", pnl_per_fill_arr)?;
out.set_item("vwap", vwap)?;
out.set_item("realized_pnl", realized_pnl)?;
Ok(out)
}
#[pymodule]
fn vwap_pnl(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compute_vwap_pnl, m)?)?;
Ok(())
}
建置專案,並安裝套件以提供Python呼叫
maturin develop
撰寫測試腳本
import time
import numpy as np
import vwap_pnl
# 造一批資料
N = 1_000_000
rng = np.random.default_rng(7)
mid = 30000 + 1000 * rng.standard_normal(N)
spr = np.clip(rng.normal(5, 1, N), 0.1, None)
bid = (mid - spr/2).astype(np.float64)
ask = (mid + spr/2).astype(np.float64)
side = rng.choice([1, -1], size=N, p=[0.5, 0.5]).astype(np.int8)
size = np.abs(rng.normal(0.01, 0.003, N)).astype(np.float64)
# PyO3
t0 = time.perf_counter()
out = vwap_pnl.compute_vwap_pnl(bid, ask, side, size)
t1 = time.perf_counter()
print("[PyO3] vwap:", out["vwap"], "realized_pnl:", out["realized_pnl"], "time:", round(t1-t0, 4), "s")
# 參考:純 Python(慢)
def py_compute(bid, ask, side, size):
fill_price = np.empty_like(bid)
realized = 0.0
pnl_per = np.zeros_like(bid)
from collections import deque
lots = deque()
for i in range(len(bid)):
s = int(side[i]); q = float(size[i])
if q <= 0:
fill_price[i] = np.nan; continue
px = ask[i] if s > 0 else bid[i]
fill_price[i] = px
if s > 0:
remain = q
while remain > 0 and lots and lots[0][0] < 0:
lot_q, lot_px = lots[0]
match_q = min(remain, -lot_q)
realized += (lot_px - px) * match_q
pnl_per[i] += (lot_px - px) * match_q
lot_q += match_q
lots.popleft()
if lot_q != 0: lots.appendleft((lot_q, lot_px))
remain -= match_q
if remain > 0: lots.append((remain, px))
else:
remain = q
while remain > 0 and lots and lots[0][0] > 0:
lot_q, lot_px = lots[0]
match_q = min(remain, lot_q)
realized += (px - lot_px) * match_q
pnl_per[i] += (px - lot_px) * match_q
lot_q -= match_q
lots.popleft()
if lot_q != 0: lots.appendleft((lot_q, lot_px))
remain -= match_q
if remain > 0: lots.append((-remain, px))
vwap = float(np.sum(fill_price * size) / np.sum(size))
return {"fill_price": fill_price, "pnl_per_fill": pnl_per, "vwap": vwap, "realized_pnl": float(realized)}
t0 = time.perf_counter()
out_py = py_compute(bid, ask, side, size)
t1 = time.perf_counter()
print("[PurePy] vwap:", out_py["vwap"], "realized_pnl:", out_py["realized_pnl"], "time:", round(t1-t0, 4), "s")
底下是腳本輸出結果 可以看到效能明顯提升
透過這次的範例,驗證了 PyO3 在高效能運算場景下的優勢,這種 Python 作為外層接口、Rust 作為核心引擎 的設計思路,能讓交易系統既保有靈活性,又能在毫秒級延遲的市場中保持競爭力。
接下來的幾天,我們會先介紹一下Rust,接著會實作一個簡單的python交易系統,完成後再根據其中可以用Rust優化的地方做加強,等我們足夠了解交易系統的架構後,就會來說明 NautilusTrader 的 V1 架構設計.